package org.qing.cloud.user.controller;

import org.qing.cloud.commons.utils.json.JSON;
import org.qing.cloud.user.entity.User;
import org.qing.cloud.user.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
@RestController
public class UserController {

	@Autowired
	private UserService service;
	
	@Autowired 
	private ReactiveRedisTemplate<String, Object> redisTemplate;

	@RequestMapping("/get/one")
	public Mono<String> getOne() {
		return Mono.just("Hello webflux");
	}

	@RequestMapping("/get/all")
	public Flux<User> findAll() {
		Flux<User> fluxUser = service.findAll();
		// 这里需要消费才行。否则无法真正操作.
		fluxUser.subscribe(user -> {
			log.info("user=[{}]", JSON.toJSONString(user));
		});
		return fluxUser;
	}

	@RequestMapping("/get/redis")
	public Flux<Object> getRedis() {
		Flux<User> fluxUser = service.findAll();
		// 转parallelStream异步处理同步逻辑
		fluxUser
		.collectList() // Flux<Object> 转 Mono<List<Object>>
		.block()	   // Mono<List<Object>> 取得整个List<Object> 列表对象
		.parallelStream() // 使用并行流处理放入redis
		.forEach(user -> {
			log.info("OPSFORVALUE_STATUS=[{}]", 
					redisTemplate.opsForValue().set("OPSFORVALUE_" + user.getId(), user).block());
			log.info("OPSFORLIST_STATUS=[{}]", 
					redisTemplate.opsForList().rightPush("OPSFORLIST_ALL", user).block());
			log.info("OPSFORSET_STATUS=[{}]", 
					redisTemplate.opsForSet().add("OPSFORSET_ALL", user).block());
			log.info("OPSFORHASH_STATUS=[{}]", 
					redisTemplate.opsForHash().put("OPSFORHASH_ALL", user.getId(), user).block());
			log.info("OPSFORZSET_STATUS=[{}]", 
					redisTemplate.opsForZSet().incrementScore("OPSFORZSET_ALL", user, user.getId()).block());
		});
		// 上面并发处理完成再执行到此处
		Long size = redisTemplate.opsForList().size("OPSFORLIST_ALL").block();
		log.info("SIZE=[{}]", size);
		return redisTemplate.opsForList().range("OPSFORLIST_ALL", 0, size - 1);
	}
	
	@RequestMapping("/get/asyn/redis")
	public Flux<Object> getAsynRedis() {
		Flux<User> fluxUser = service.findAll();
		// 全异步会未写入完就执行到 log.info("size=[{}]", size)
		fluxUser.subscribe(user -> {
			redisTemplate.opsForValue().set("ASYN_OPSFORVALUE_" + user.getId(), user)
					.subscribe(s -> log.info("ASYN_OPSFORVALUE_STATUS=[{}]", s));

			redisTemplate.opsForList().rightPush("ASYN_OPSFORLIST_ALL", user)
					.subscribe(s -> log.info("ASYN_OPSFORLIST_STATUS=[{}]", s));

			redisTemplate.opsForSet().add("ASYN_OPSFORSET_ALL", user)
					.subscribe(s -> log.info("ASYN_OPSFORSET_STATUS=[{}]", s));

			redisTemplate.opsForHash().put("ASYN_OPSFORHASH_ALL", user.getId(), user)
					.subscribe(s -> log.info("ASYN_OPSFORHASH_STATUS=[{}]", s));

			redisTemplate.opsForZSet().incrementScore("ASYN_OPSFORZSET_ALL", user, user.getId())
					.subscribe(s -> log.info("ASYN_OPSFORZSET_STATUS=[{}]", s));

		});
		// 上面多线程未处理完成就往下执行到此处
		Long size = redisTemplate.opsForList().size("ASYN_OPSFORLIST_ALL").block();
		log.info("ASYN_SIZE=[{}]", size);
		return redisTemplate.opsForList().range("ASYN_OPSFORLIST_ALL", 0, size - 1);
	}

}
